package com.offcn.bigdata.spark.sql.p1

import org.apache.spark.sql.SparkSession

/**
  * SparkSQL和Hive的整合
  * 使用sparksql完成hive表的创建，数据的加载，表的关联查询，表的结果落地
  * teacher_info
  *     name, height
  *
  *     zhangsan,175
  * teacher_basic
  *     name,age,gender,course
  *
  *     zhangsan,23,false,0
  *
  *  teacher
  *     name, age, gender, height, course
  * sparkSQL整合Hive需要注意事项：
  *     1. 需要将hive的配置交给spark(hive-site.xml)
  *         1°、将hive-site.xml拷贝到每一台spark的conf目录下面，
  *         2°、将hive-site.xml放到项目的classpath目录下面即可
  *     2. 因为在sparksql的时候，会自动的创建一个目录spark-warehouse，其实就是
  *       <property>
            <name>hive.metastore.warehouse.dir</name>
            <value>/user/hive/warehouse</value>
          </property>
        配置的，如果hive-site.xml中没有改参数，那么就会在本地创建一个名为spark-warehouse的目录
            通常会出现的问题：except：file:///xxx, actual: hdfs://
       3. spark要想正确的读取hive的元数据信息，就得需要从元数据库中加载，则需要mysql的驱动，
        也就是需要将mysql驱动包添加到项目的classpath下面
  */
object _06SparkSQLIntegeredWithHiveOps {
    def main(args: Array[String]): Unit = {

        if(args == null || args.length != 2) {
            println(
                """
                  |Usage: <infopath> <basicpath>
                """.stripMargin)
            System.exit(-1)
        }

        val Array(infopath, basicpath) = args

        val spark = SparkSession.builder()
                    .appName("_06SparkSQLIntegeredWithHiveOps")
                    .enableHiveSupport()
                    .getOrCreate()
        println("create database test_0829")
        spark.sql(
            """
              |create database test_0829
            """.stripMargin)
        println("========>created database test_0829 and pareparing create table teacher_info.")
        spark.sql(
            """
              |create table `test_0829`.`teacher_info` (
              |    name string,
              |    height double
              |) row format delimited
              |fields terminated by ','
            """.stripMargin)
        println("========>created table teacher_info esand pareparing create table teacher_basic.")
        spark.sql(
            """
              |create table `test_0829`.`teacher_basic` (
              |    name string,
              |    age int,
              |    gender boolean,
              |    course int
              |) row format delimited
              |fields terminated by ','
            """.stripMargin)
        println("========>created table teacher_basic esand pareparing loading data.")
        spark.sql(
            s"""
              |load data inpath '${infopath}' into table `test_0829`.`teacher_info`
            """.stripMargin)
        spark.sql(
            s"""
               |load data inpath '${basicpath}' into table `test_0829`.`teacher_basic`
            """.stripMargin)
        println("========>loaded data and execute join quey...")

        val result = spark.sql(
            """
              |select
              |  i.name,
              |  b.age,
              |  if(b.gender, 'female', 'male') gender,
              |  i.height,
              |  b.course
              |from `test_0829`.`teacher_info` i
              |inner join `test_0829`.`teacher_basic` b on i.name = b.name
            """.stripMargin)
        println("=======>executed query and save table into hive")
        result.write.saveAsTable("`test_0829`.`teacher`")
        spark.stop()
    }
}
